flink 学习(七)flink 滚动窗口

您所在的位置:网站首页 flink 丢失数据 flink 学习(七)flink 滚动窗口

flink 学习(七)flink 滚动窗口

2024-06-11 16:04| 来源: 网络整理| 查看: 265

文章目录 前言1.Tumbling-ProcessingTime-Window2.Tumbling-ProcessingTime-Windows-Offset3.Tumbling-Count-Window4.Tumbling-EventTime-Window

前言

        Flink中的窗口算子,是处理无界流的核心,窗口将流分割成很多的“buckets”,每个bucket可以基于时间、元素个数等进行分隔,针对每一个bucket可以做一些数据的计算。

窗口的类型: 滚动窗口:将数据根据固定的窗口长度进行分隔,窗口的长度固定,数据没有重叠。 滑动窗口:窗口的长度固定,根据滑动间隔确定窗口的返回,数据可以重叠。 回话窗口:一段时间内没有接收到新数据会生成新的窗口。

本文主要对滚动窗口做一些示例。

1.Tumbling-ProcessingTime-Window

基于事件处理时间(ProcessingTime)的滚动窗口

(1)自定义数据源

每秒生成一个数字

public class IntegerSource implements SourceFunction { int i = 0; @Override public void run(SourceContext ctx) throws Exception { while (true) { ctx.collect(i++); Thread.sleep(1000); } } @Override public void cancel() { } }

(2)示例

@Test public void tumblingProcessingTimeWindowsTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); DataStreamSource source = env.addSource(new IntegerSource()); //基于ProcessingTime的滚动窗口,窗口长度时3s source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3))) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { Iterator it = elements.iterator(); int sum = 0; while (it.hasNext()) { Integer next = it.next(); sum += next; System.out.println("元素: " + next + " ,处理时间:" + new Date()); } out.collect(sum); } }) .print(); env.execute("TumblingProcessingTimeWindows"); }

结果:

元素: 0 ,处理时间:Sat Apr 16 21:39:15 CST 2022 元素: 1 ,处理时间:Sat Apr 16 21:39:15 CST 2022 3> 1 元素: 2 ,处理时间:Sat Apr 16 21:39:18 CST 2022 元素: 3 ,处理时间:Sat Apr 16 21:39:18 CST 2022 元素: 4 ,处理时间:Sat Apr 16 21:39:18 CST 2022 4> 9 元素: 5 ,处理时间:Sat Apr 16 21:39:21 CST 2022 元素: 6 ,处理时间:Sat Apr 16 21:39:21 CST 2022 元素: 7 ,处理时间:Sat Apr 16 21:39:21 CST 2022 5> 18 2.Tumbling-ProcessingTime-Windows-Offset

带偏移量的基于基于ProcessingTime的滚动窗口

示例:

@Test public void tumblingProcessingTimeWindowsWithOffsetTest() throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.zzz"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); DataStreamSource source = env.addSource(new IntegerSource()); //基于ProcessingTime的滚动窗口,窗口长度时3s,偏移量是2s source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3), Time.seconds(2))) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { Iterator it = elements.iterator(); int sum = 0; while (it.hasNext()) { Integer next = it.next(); sum += next; System.out.println("元素: " + next + " ,处理时间:" + format.format(new Date())); } out.collect(sum); } }) .print(); env.execute("tumbling window"); }

结果:

元素: 0 ,处理时间:2022-04-16 21:43:02.CST 元素: 1 ,处理时间:2022-04-16 21:43:02.CST 元素: 2 ,处理时间:2022-04-16 21:43:02.CST 8> 3 元素: 3 ,处理时间:2022-04-16 21:43:05.CST 元素: 4 ,处理时间:2022-04-16 21:43:05.CST 元素: 5 ,处理时间:2022-04-16 21:43:05.CST 1> 12 元素: 6 ,处理时间:2022-04-16 21:43:08.CST 元素: 7 ,处理时间:2022-04-16 21:43:08.CST 元素: 8 ,处理时间:2022-04-16 21:43:08.CST 2> 21

设置偏移量为2s后,处理时间往后偏移了2s,本来应该在2022-04-16 21:43:03、2022-04-16 21:43:06时间点处理元素,现在偏移到了2022-04-16 21:43:05、2022-04-16 21:43:08

3.Tumbling-Count-Window

(1)数据源

为了将时间和数量区分开,将数据源调整为 0.5s 生产一个数据

public class IntegerSource implements SourceFunction { int i = 0; @Override public void run(SourceContext ctx) throws Exception { while (true) { ctx.collect(i++); Thread.sleep(500); } } @Override public void cancel() { } }

(2)示例

@Test public void tumblingCountWindowsTest() throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.zzz"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); DataStreamSource source = env.addSource(new IntegerSource()); //每5个数据计算一次 source.countWindowAll(5) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { Iterator it = elements.iterator(); int sum = 0; while (it.hasNext()) { Integer next = it.next(); sum += next; System.out.println("元素: " + next + " ,处理时间:" + format.format(new Date())); } out.collect(sum); } }) .print(); env.execute("countWindowAll"); }

结果:

元素: 0 ,处理时间:2022-04-16 21:50:10.CST 元素: 1 ,处理时间:2022-04-16 21:50:10.CST 元素: 2 ,处理时间:2022-04-16 21:50:10.CST 元素: 3 ,处理时间:2022-04-16 21:50:10.CST 元素: 4 ,处理时间:2022-04-16 21:50:10.CST 8> 10 元素: 5 ,处理时间:2022-04-16 21:50:13.CST 元素: 6 ,处理时间:2022-04-16 21:50:13.CST 元素: 7 ,处理时间:2022-04-16 21:50:13.CST 元素: 8 ,处理时间:2022-04-16 21:50:13.CST 元素: 9 ,处理时间:2022-04-16 21:50:13.CST 1> 35 4.Tumbling-EventTime-Window

基于事件时间(EventTime)的滚动窗口

(1)数据实体类

@Data @AllArgsConstructor public class EventElement { private Integer id; //数据时间 private Long time; //为了方便打印 private Date date; }

(2)数据源

public class EventElementSource implements SourceFunction { @Override public void run(SourceContext ctx) throws Exception { int id = 0; Random random = new Random(); while (true) { long time = new Date().getTime() + random.nextInt(5000); Date date = new Date(time); EventElement eventElement = new EventElement(id++, time, date); ctx.collect(eventElement); Thread.sleep(1000); } } @Override public void cancel() { } }

(3)示例

@Test public void tumblingEventTimeWindowsTest() throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.zzz"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING) .setParallelism(1); //添加数据源 env.addSource(new EventElementSource()) //设置时间戳和水印,使用数据里的时间 .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((eventTElement, re) -> eventTElement.getTime())) //基于事件时间的滚动窗口 .windowAll(TumblingEventTimeWindows.of(Time.seconds(3))) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { Iterator it = elements.iterator(); int id = 0; while (it.hasNext()) { EventElement next = it.next(); id = next.getId(); System.out.println("元素: " + next + " ,处理时间:" + format.format(new Date())); } out.collect(id); } }) .print(); env.execute("TumblingEventTimeWindows"); }

结果: 数据产生时间

EventElement(id=0, time=1650120291747, date=Sat Apr 16 22:44:51 CST 2022) EventElement(id=1, time=1650120289932, date=Sat Apr 16 22:44:49 CST 2022) EventElement(id=2, time=1650120293657, date=Sat Apr 16 22:44:53 CST 2022) EventElement(id=3, time=1650120291129, date=Sat Apr 16 22:44:51 CST 2022) EventElement(id=4, time=1650120291191, date=Sat Apr 16 22:44:51 CST 2022) EventElement(id=5, time=1650120294228, date=Sat Apr 16 22:44:54 CST 2022) EventElement(id=6, time=1650120293441, date=Sat Apr 16 22:44:53 CST 2022) EventElement(id=7, time=1650120294474, date=Sat Apr 16 22:44:54 CST 2022) EventElement(id=8, time=1650120296386, date=Sat Apr 16 22:44:56 CST 2022) EventElement(id=9, time=1650120298108, date=Sat Apr 16 22:44:58 CST 2022) EventElement(id=13, time=1650120303753, date=Sat Apr 16 22:45:03 CST 2022) EventElement(id=14, time=1650120305941, date=Sat Apr 16 22:45:05 CST 2022) EventElement(id=15, time=1650120306923, date=Sat Apr 16 22:45:06 CST 2022

数据处理时间:

元素: EventElement(id=0, time=1650120291747, date=Sat Apr 16 22:44:51 CST 2022) ,处理时间:2022-04-16 22:44:52.CST 元素: EventElement(id=2, time=1650120293657, date=Sat Apr 16 22:44:53 CST 2022) ,处理时间:2022-04-16 22:44:52.CST 元素: EventElement(id=3, time=1650120291129, date=Sat Apr 16 22:44:51 CST 2022) ,处理时间:2022-04-16 22:44:52.CST 元素: EventElement(id=4, time=1650120291191, date=Sat Apr 16 22:44:51 CST 2022) ,处理时间:2022-04-16 22:44:52.CST 4 元素: EventElement(id=5, time=1650120294228, date=Sat Apr 16 22:44:54 CST 2022) ,处理时间:2022-04-16 22:44:56.CST 元素: EventElement(id=7, time=1650120294474, date=Sat Apr 16 22:44:54 CST 2022) ,处理时间:2022-04-16 22:44:56.CST 元素: EventElement(id=8, time=1650120296386, date=Sat Apr 16 22:44:56 CST 2022) ,处理时间:2022-04-16 22:44:56.CST 8 EventElement(id=10, time=1650120297360, date=Sat Apr 16 22:44:57 CST 2022) EventElement(id=11, time=1650120299111, date=Sat Apr 16 22:44:59 CST 2022) EventElement(id=12, time=1650120303800, date=Sat Apr 16 22:45:03 CST 2022) 元素: EventElement(id=9, time=1650120298108, date=Sat Apr 16 22:44:58 CST 2022) ,处理时间:2022-04-16 22:44:59.CST 元素: EventElement(id=10, time=1650120297360, date=Sat Apr 16 22:44:57 CST 2022) ,处理时间:2022-04-16 22:44:59.CST 元素: EventElement(id=11, time=1650120299111, date=Sat Apr 16 22:44:59 CST 2022) ,处理时间:2022-04-16 22:44:59.CST 11 元素: EventElement(id=12, time=1650120303800, date=Sat Apr 16 22:45:03 CST 2022) ,处理时间:2022-04-16 22:45:02.CST 元素: EventElement(id=13, time=1650120303753, date=Sat Apr 16 22:45:03 CST 2022) ,处理时间:2022-04-16 22:45:02.CST 元素: EventElement(id=14, time=1650120305941, date=Sat Apr 16 22:45:05 CST 2022) ,处理时间:2022-04-16 22:45:02.CST 14

通过水印,flink会在属于窗口的最后一个元素到达时关闭当前窗口,当然如果数据的时间不是递增的,也可能会出现丢失数据的情况。 1、id为0的数据生成时间是22:44:51,id 为1的数据生成的时间16 22:44:49已经不再第一个窗口的范围内。 2、第一个窗口处理完成后,id为6的数据生成时间是16 22:44:53处于第一个窗口的范围内,当前窗口也不会处理该数据了,造成数据丢失。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3